Pulling system data and metrics from Qumulo

This is an unsupported and highly experimental approach for pulling metrics data from a Qumulo cluster


In [ ]:
import os
import requests
import json
import time
import pprint
import re
from collections import OrderedDict
# python + ssl on MacOSX is rather noisy against dev clusters
requests.packages.urllib3.disable_warnings()

In [ ]:
FULL_KEYS = OrderedDict([
    ('ad_base_ldap_client--request_latency_calls', {'name':'active_directory~op_count'}),
    ('ad_base_ldap_client--request_latency_total_time_usec', {'name':'active_directory~msec', 'divide':1000}),

    ('trans_stats-name:fs_create_file-calls', {'name':'file_create~op_count'}),
    ('trans_stats-name:fs_create_file-time_sum', {'name':'file_create~msec', 'divide':1000}),
    ('trans_stats-name:fs_rename-calls', {'name':'file_rename~op_count'}),
    ('trans_stats-name:fs_rename-time_sum', {'name':'file_rename~msec', 'divide':1000}),
    ('trans_stats-name:fs_unlink-calls', {'name':'file_delete~op_count'}),
    ('trans_stats-name:fs_unlink-time_sum', {'name':'file_delete~msec', 'divide':1000}),

    ('trans_stats-name:fs_read-data_read_hdd_misses', {'name':'file_read_cache_miss~bytes', 'divide':1.0/4096}),
    ('trans_stats-name:fs_read-data_read_ssd_misses', {'name':'file_read_cache_miss~bytes', 'divide':1.0/4096}),
    ('trans_stats-name:fs_read-data_read_disk_cache_hits', {'name':'file_read_from_ram~bytes', 'divide':1.0/4096}),
    ('trans_stats-name:fs_read-data_read_trans_cache_hits', {'name':'file_read_from_ram~bytes', 'divide':1.0/4096}),
    ('trans_stats-name:fs_read-data_read_on_hdd', {'name':'file_read_from_hdd~bytes', 'divide':1.0/4096}),
    ('trans_stats-name:fs_read-data_read_on_ssd', {'name':'file_read_from_ssd~bytes', 'divide':1.0/4096}),

    ('global_perf_counters--snapshots:garbage_collector:blocks_deleted', {'name':'snapshot_deleted_data~bytes', 'divide':1.0/4096}),
    ('global_perf_counters--snapshots:garbage_collector:inodes_deleted', {'name':'snapshot_deleted_inodes~count'}),
    ('global_perf_counters--snapshots:garbage_collector:snapshots_deleted', {'name':'snapshot_deleted_snapshots~count'}),
    ('global_perf_counters--fs:statistics:deferred_bytes_deleted', {'name':'deleted_deferred~bytes'}),
    ('global_perf_counters--fs:statistics:snapshot_bytes_deleted', {'name':'deleted_snapshot~bytes'}),
    ('global_perf_counters--tree_delete:adapter:unlinked_count', {'name':'tree_delete_files_deleted~count'}),
    ('global_perf_counters--tree_delete:deleter:tree_delete_count', {'name':'tree_delete~op_count'}),
    ('global_perf_counters--fs:prefetcher:contiguous_policy_prefetch_waste', {'name':'prefetch_waste~bytes'}),
    ('global_perf_counters--fs:prefetcher:contiguous_policy_prefetched', {'name':'prefetch_success~bytes'}),
    ('global_perf_counters--fs:prefetcher:contiguous_policy_read_not_predicted', {'name':'prefetch_missed~bytes'}),
    ('global_perf_counters--fs:prefetcher:contiguous_policy_wasted_lookups', {'name':'prefetch_file_next_waste~count'}),
    ('global_perf_counters--fs:prefetcher:next_file_confirmed', {'name':'prefetch_file_next_success~count'}),

    ('public_audit_logger--bytes_sent', {'name':'audit_sent~bytes'}),
    ('public_audit_logger--messages_buffered', {'name':'audit_buffered~op_count'}),
    ('public_audit_logger--messages_dropped_due_to_full_buffer', {'name':'audit_dropped~op_count'}),
    ('public_audit_logger--messages_dropped_due_to_full_socket', {'name':'audit_dropped~op_count'}),
    ('public_audit_logger--messages_sent', {'name':'audit_sent~op_count'}),
    ('audit_log_deduper--duplicate_operations_seen', {'name':'audit_dedup~op_count'}),
])

RX_KEYS = OrderedDict([
    ('trans_stats.*-meta_read_on_hdd', {'name':'metadata_read_from_hdd~bytes', 'divide':1.0/4096}),
    ('trans_stats.*-meta_read_on_ssd', {'name':'metadata_read_from_ssd~bytes', 'divide':1.0/4096}),
    ('elevator_user.*type:DEVICE_MEDIA_TYPE_SPINNING_DISK&user:DEFAULT_CLASS-bytes_written', {'name': 'drive_hdd_write~bytes'}),
    ('elevator_user.*type:DEVICE_MEDIA_TYPE_SSD&user:DEFAULT_CLASS-bytes_written', {'name': 'drive_ssd_write~bytes'}),
    ('elevator_user.*type:DEVICE_MEDIA_TYPE_SPINNING_DISK&user:DEFAULT_CLASS-bytes_read', {'name': 'drive_hdd_read~bytes'}),
    ('elevator_user.*type:DEVICE_MEDIA_TYPE_SSD&user:DEFAULT_CLASS-bytes_read', {'name': 'drive_ssd_read~bytes'}),

    ('elevator_user.*type:DEVICE_MEDIA_TYPE_SPINNING_DISK&user:DEFAULT_CLASS-.*latency_calls', {'name': 'drive_hdd~op_count'}),
    ('elevator_user.*type:DEVICE_MEDIA_TYPE_SSD&user:DEFAULT_CLASS-.*latency_calls', {'name': 'drive_ssd~op_count'}),
    ('elevator_user.*type:DEVICE_MEDIA_TYPE_SPINNING_DISK&user:DEFAULT_CLASS-.*latency_total_time_usec', {'name': 'drive_hdd~msec', 'divide':1000}),
    ('elevator_user.*type:DEVICE_MEDIA_TYPE_SSD&user:DEFAULT_CLASS-.*latency_total_time_usec', {'name': 'drive_ssd~msec', 'divide':1000}),

    ('network_interface-interface:eth.*&state:active&network:both-dropped_packets', {'name': 'network_dropped_packets_node%s~op_count'}),
    ('network_interface-interface:eth.*&state:active&network:both-frame_errors', {'name': 'network_frame_errors_node%s~op_count'}),

])

In [ ]:
class QumuloMetrics:
    def __init__(self, host, username, password):
        self.metrics = {}
        self.ROOT_URL = 'https://' + host + ':8000'
        self.URLS = {
            'login': '/v1/session/login',
            'drive_status': '/v1/cluster/slots/',
            'node_status': '/v1/cluster/nodes/',
            'node_hw': '/v1/cluster/nodes/chassis/',
            'capacity': '/v1/file-system',
            'root_aggregate': '/v1/files/%2F/aggregates/?max-entries=0',
            'metrics': '/v1/metrics/?node-id=%s',
            'network_status': '/v2/network/interfaces/%s/status/',
            'protocol_connections': '/v2/network/connections/',
            'restriper_status': '/v1/cluster/restriper/status',
            'snapshots': '/v2/snapshots/status/',
            'replication_source': '/v2/replication/source-relationships/status/',
            'replication_target': '/v2/replication/target-relationships/status/',
            'quotas_status': '/v1/files/quotas/status/?limit=2',
            'smb_share_status': '/v2/smb/shares/',
            'nfs_export_status': '/v2/nfs/exports/',
            'roles': '/v1/auth/roles/',
            'role_members': '/v1/auth/roles/%s/members',
            'snapshot_policies': '/v1/snapshots/policies/status/',
        }
        for k, url in self.URLS.items():
            self.URLS[k] = self.ROOT_URL + url
        self.header = {'content-type': 'application/json'}
        resp = requests.post(self.URLS['login'], 
                          data=json.dumps({'username': username, 'password': password}), 
                          headers=self.header, 
                          verify=False)
        resp_data = json.loads(resp.text)
        self.header['Authorization'] = 'Bearer ' + resp_data['bearer_token']
        resp = requests.get(self.URLS['node_status'], headers=self.header, verify=False)
        self.nodes = json.loads(resp.text)
        
    def add_metric(self, k, v=0, replace=False):
        if k not in self.metrics or replace:
            self.metrics[k] = v
        else:
            self.metrics[k] += v

    def get_metrics_inventory(self):
        data = {}
        for node in self.nodes:
            resp = requests.get(URLS["metrics"] % node['id'], headers=header, verify=False)
            node_metrics = json.loads(resp.text)
            for node_metric in node_metrics:
                for name, val in node_metric['fields'].items():
                    measure = node_metric['measurement']
                    tags = '&'.join(["%s:%s" % (k, v) for k, v in node_metric['tags'].items()])
                    key = "%s\t%s\t%s\t%s-%s-%s" % (measure, tags, name, measure, tags, name)
                    if key not in data:
                        data[key] = float(val)
                    else:
                        data[key] += float(val)
        return data

            
    def get_metrics(self, full_keys, rx_keys):
        self.metrics = {}
        for node in self.nodes:
            resp = requests.get(self.URLS["metrics"] % node['id'], headers=self.header, verify=False)
            node_metrics = json.loads(resp.text)
            for node_metric in node_metrics:
                full_key = node_metric['measurement'] + "-" + '&'.join(["%s:%s" % (k, v) for k, v in node_metric['tags'].items()])
                fields = node_metric['fields']
                if node_metric['measurement'] == 'protocol_op':
                    op = node_metric['tags']['op_name']
                    protocol = op[0:3].lower()
                    op_short = 'write' if re.match(".*(WRITE|SET|CREATE).*", op) else 'read'
                    op_type = 'metadata'
                    if int(fields['total_bytes']) > 0:
                        self.add_metric('file_%s_data~msec' % (op_short), int(fields['total_latency_total_time_usec'])/1000.0)
                        self.add_metric('file_%s_data~bytes' % op_short, int(fields['total_bytes']))
                        self.add_metric('file_%s_data~op_count' % (op_short), int(fields['total_latency_calls']))
                        op_type = 'data'
                    self.add_metric('file_%s_%s_%s~op_count' % (op_short, op_type, protocol), int(fields['total_latency_calls']))
                    self.add_metric('file_%s_%s_%s~msec' % (op_short, op_type, protocol), int(fields['total_latency_total_time_usec'])/1000.0)
                    if op_type == 'data':
                        self.add_metric('file_%s_%s_%s~bytes' % (op_short, op_type, protocol), int(fields['total_bytes']))
                        self.add_metric('file_total_%s_node%s~bytes' % (op_short, "%03d" % node['id']), int(fields['total_bytes']))

                    self.add_metric('file_total_node%s~msec' % ("%03d" % node['id']), int(fields['total_latency_total_time_usec'])/1000.0)
                    self.add_metric('file_total_node%s~op_count' % ("%03d" % node['id']), int(fields['total_latency_calls']))

                    for name, val in fields.items():
                        if 'io_size_bucket_' in name:
                            bytes_num = 1048576
                            if int(name.split('_')[3]) <= 4096:
                                bytes_num = 4096
                            elif int(name.split('_')[3]) <= 65536:
                                bytes_num = 65536
                            self.add_metric('file_%s_data_%07dbytes~op_count' % (op_short, bytes_num), int(val))

                for name, val in fields.items():
                    k = full_key + "-" + name
                    if k in full_keys:
                        kd = full_keys[k]
                        self.add_metric(kd['name'], int(val) / (1 if 'divide' not in kd else kd['divide']))
                    else:
                        for rx, kd in rx_keys.items():
                            if re.match(rx, k):
                                if 'node' in kd['name']:
                                    self.add_metric(kd['name'] % ("%03d" % node['id']), int(val) / (1 if 'divide' not in kd else kd['divide']))
                                else:
                                    self.add_metric(kd['name'], int(val) / (1 if 'divide' not in kd else kd['divide']))

        ###############   network status   ###############
        resp = requests.get(self.URLS['network_status'] % 1, headers=self.header, verify=False)
        for node in json.loads(resp.text):
            self.add_metric('network_mbit_node%03d~speed' % int(node["node_id"]), 
                            node['interface_details']['speed'], True)
            self.add_metric('network_recv_node%03d~bytes' % int(node["node_id"]), 
                            node['interface_details']['bytes_received'], True)
            self.add_metric('network_sent_node%03d~bytes' % int(node["node_id"]), 
                            node['interface_details']['bytes_sent'], True)

        ###############   restriper status   ###############
        resp = requests.get(self.URLS['restriper_status'], headers=self.header, verify=False)
        d = json.loads(resp.text)
        self.add_metric('restriper_elapsed_time~seconds' % node, 
                        d['elapsed_seconds'], True)
        self.add_metric('restriper_time_left~seconds' % node, 
                        d['estimated_seconds_left'], True)

        ###############   drive status   ###############
        resp = requests.get(self.URLS['drive_status'], headers=self.header, verify=False)
        for d in json.loads(resp.text):
            self.add_metric('drive_%s_healthy~count' % d['disk_type'].lower(), 1 if d['state'] == 'healthy' else 0)
            self.add_metric('drive_%s_unhealthy~count' % d['disk_type'].lower(), 1 if d['state'] != 'healthy' else 0)

        ###############   node status   ###############
        resp = requests.get(self.URLS['node_status'], headers=self.header, verify=False)
        for d in json.loads(resp.text):
            self.add_metric('node_online~count', 1 if d['node_status'] == 'online' else 0)
            self.add_metric('node_unhealthy~count', 1 if d['node_status'] != 'online' else 0)

        ###############   psu status   ###############
        resp = requests.get(self.URLS['node_hw'], headers=self.header, verify=False)
        for n in json.loads(resp.text):
            for d in n['psu_statuses']:
                self.add_metric('psu_healthy~count' , 1 if d['state'] == 'GOOD' else 0)
                self.add_metric('psu_unhealthy~count' , 1 if d['state'] != 'GOOD' else 0)

        ###############   capacity    ###############
        resp = requests.get(self.URLS['root_aggregate'], headers=self.header, verify=False)
        d = json.loads(resp.text)
        self.add_metric('capacity_directories~count', int(d['total_directories']))
        self.add_metric('capacity_files~count', int(d['total_files']))
        self.add_metric('capacity_other~count', int(d['total_other_objects']) + int(d['total_symlinks']))
        self.add_metric('capacity_streams~count', int(d['total_named_streams']))
        
        resp = requests.get(self.URLS['capacity'], headers=self.header, verify=False)
        d = json.loads(resp.text)
        self.add_metric('capacity_free~bytes', int(d['free_size_bytes']))
        self.add_metric('capacity_snapshots~bytes', int(d['snapshot_size_bytes']))
        self.add_metric('capacity_usable~bytes', int(d['total_size_bytes']))
        self.add_metric('capacity_used~bytes', int(d['total_size_bytes']) - int(d['free_size_bytes']))

        ###############   snapshots   ###############
        resp = requests.get(self.URLS['snapshots'], headers=self.header, verify=False)
        d = json.loads(resp.text)
        self.add_metric('snapshot_root~count', 0)
        for snap in d['entries']:
            if snap['source_file_path'] == '/':
                self.add_metric('snapshot_root~count', 1)
            if snap['expiration'] == '':
                self.add_metric('snapshot_no_expiration~count', 1)
            self.add_metric('snapshot_total~count', 1)
        resp = requests.get(self.URLS["snapshot_policies"], headers=self.header, verify=False)
        d = json.loads(resp.text)
        for snap in d['entries']:
            self.add_metric('snapshot_policies_total~count', 1)
            if snap['enabled']:
                self.add_metric('snapshot_policies_enabled_total~count', 1)

        ###############   replication   ###############            
        for repl_type in ['source', 'target']:
            resp = requests.get(self.URLS['replication_%s' % repl_type], headers=self.header, verify=False)
            dd = json.loads(resp.text)
            for d in dd:
                self.add_metric('replication_%s_relationship~count' % repl_type, 1)
                if d['state'] != 'ESTABLISHED':
                    self.add_metric('replication_%s_relationship_unestablished~count' % repl_type, 1)
                if d['job_state'] != 'REPLICATION_NOT_RUNNING':
                    self.add_metric('replication_%s_relationship_active~count' % repl_type, 1)
                    if d['replication_job_status'] is not None:
                        self.add_metric('replication_%s_active_files_remaining~count' % repl_type, 
                                        int(d['replication_job_status']['files_remaining']))
                        self.add_metric('replication_%s_active_throughput~bytepersec' % repl_type, 
                                        int(d['replication_job_status']['throughput_current']))

        ###############   quotas   ###############        
        next_url = self.URLS["quotas_status"]
        while next_url != '':
            resp = requests.get(next_url, headers=self.header, verify=False)
            d = json.loads(resp.text)
            if 'quotas' not in d:
                next_url = ''
                break
            for q in d['quotas']:
                self.add_metric('quotas~count', 1)
                if int(q['capacity_usage']) >= q['limit']:
                    self.add_metric('quotas_exceeded~count', 1)
                if int(q['capacity_usage']) >= q['limit']*95:
                    self.add_metric('quotas_exceeded_95~count', 1)
            next_url = self.ROOT_URL + d['paging']['next']

        ###############   smb shares   ###############        
        resp = requests.get(self.URLS["smb_share_status"], headers=self.header, verify=False)
        d = json.loads(resp.text)
        for share in d:
            self.add_metric('smb_share~count', 1)
            if share['require_encryption']:
                self.add_metric('smb_share_encrypted~count', 1)
            writable = False
            for p in share['permissions']:
                if 'WRITE' in p['rights']:
                    writable = True
            if not writable:
                self.add_metric('smb_share_not_writable~count', 1)

        ###############   nfs exports   ###############        
        resp = requests.get(self.URLS["nfs_export_status"], headers=self.header, verify=False)
        d = json.loads(resp.text)
        for share in d:
            self.add_metric('nfs_exports~count', 1)
            if share['restrictions'][0]['read_only']:
                self.add_metric('nfs_exports_read_only~count', 1)
            if len(share['restrictions'][0]['host_restrictions']) > 0:
                self.add_metric('nfs_exports_has_host_restrictions~count', 1)
            if len(share['restrictions']) > 1:
                self.add_metric('nfs_exports_multiple_restrictions~count', 1)

        ###############   roles   ###############        
        resp = requests.get(self.URLS["roles"], headers=self.header, verify=False)
        d = json.loads(resp.text)
        self.add_metric('roles_total~count', len(d))
        resp = requests.get(self.URLS["role_members"] % 'Administrators', headers=self.header, verify=False)
        d = json.loads(resp.text)
        self.add_metric('roles_members_administrators~count', len(d['members']))

        ###############   protocol_connections   ###############        
        resp = requests.get(self.URLS["protocol_connections"], headers=self.header, verify=False)
        d = json.loads(resp.text)
        for cc in d:
            for c in cc['connections']:
                self.add_metric('protocol_connection~count', 1)
                self.add_metric('protocol_connection_%s~count' % c['type'][-3:].lower() , 1)

        return self.metrics

In [ ]:
creds = [
        ['test.test.com', 'admin', '****'],
        ]

fw = open("metrics-summary.txt", "w")
for cred in creds:
    qm = QumuloMetrics(cred[0], cred[1], cred[2])
    m0 = qm.get_metrics(FULL_KEYS, RX_KEYS)
    time.sleep(55)
    m1 = qm.get_metrics(FULL_KEYS, RX_KEYS).items()    
    for k, v in sorted(m1):
        fw.write("%s\t%s\t%s\t%s\t%s\n" % (cred[0], k, int(v), int(m0[k]), int(v)-int(m0[k]) ))
fw.close()